fdddf59f387d20e2893156005688bb4d2c4cea77,gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/simulation/dual/DualSimulation.java,DualSimulation,simulateBulk,#DataSet#,178
Before Change
*/
private DataSet<FatVertex> simulateBulk(DataSet<FatVertex> vertices) {
if (LOG.isDebugEnabled()) {
vertices = vertices
.map(new PrintFatVertex(false, "iteration start"))
.withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
.withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
}
// ITERATION HEAD
IterativeDataSet<FatVertex> workSet = vertices.iterate(Integer.MAX_VALUE);
// ITERATION BODY
// validate neighborhood of each vertex and create deletions
DataSet<Deletion> deletions = workSet
.filter(new UpdatedFatVertices())
.flatMap(new ValidateNeighborhood(getQuery()));
if (LOG.isDebugEnabled()) {
deletions = deletions
.map(new PrintDeletion(true, "deletion"))
.withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
.withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
}
// combine deletions to message
DataSet<Message> combinedMessages = deletions
.groupBy(0)
.combineGroup(new CombinedMessages());
if (LOG.isDebugEnabled()) {
combinedMessages = combinedMessages
.map(new PrintMessage(true, "combined"))
.withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
.withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
}
// group messages to final message
DataSet<Message> messages = combinedMessages
.groupBy(0)
.reduceGroup(new GroupedMessages());
After Change
*/
private DataSet<FatVertex> simulateBulk(DataSet<FatVertex> vertices) {
vertices = log(vertices, new PrintFatVertex(false, "iteration start"),
getVertexMapping(), getEdgeMapping());
// ITERATION HEAD
IterativeDataSet<FatVertex> workSet = vertices.iterate(Integer.MAX_VALUE);
// ITERATION BODY
// validate neighborhood of each vertex and create deletions
DataSet<Deletion> deletions = workSet
.filter(new UpdatedFatVertices())
.flatMap(new ValidateNeighborhood(getQuery()));
deletions = log(deletions, new PrintDeletion(true, "deletion"),
getVertexMapping(), getEdgeMapping());
// combine deletions to message
DataSet<Message> combinedMessages = deletions
.groupBy(0)
.combineGroup(new CombinedMessages());
combinedMessages = log(combinedMessages, new PrintMessage(true, "combined"),
getVertexMapping(), getEdgeMapping());
// group messages to final message
DataSet<Message> messages = combinedMessages
.groupBy(0)
.reduceGroup(new GroupedMessages());